Skip to content

Speed up LoadMode.DBT_LS by caching dbt ls output in Airflow Variable#1014

Merged
tatiana merged 84 commits into
mainfrom
cache-graph-json
Jun 25, 2024
Merged

Speed up LoadMode.DBT_LS by caching dbt ls output in Airflow Variable#1014
tatiana merged 84 commits into
mainfrom
cache-graph-json

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Jun 3, 2024

Improve significantly the LoadMode.DBT_LS performance. The example DAGs tested reduced the task queueing time significantly (from ~30s to ~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s (by more than 50%). Some users reported improvements of 84% in the DAG run time when trying out these changes. This difference can be even more significant on larger dbt projects.

The improvement was accomplished by caching the dbt ls output as an Airflow Variable. This is an alternative to #992, when we cached the pickled DAG/TaskGroup into a local file in the Airflow node. Unlike #992, this approach works well for distributed deployments of Airflow.

As with any caching solution, this strategy does not guarantee optimal performance on every run—whenever the cache is regenerated, the scheduler or DAG processor will experience a delay. It was also observed that the key value could change across platforms (e.g., Darwin and Linux). Therefore, if using a deployment with heterogeneous OS, the key may be regenerated often.

Closes: #990
Closes: #1061

Enabling/disabling this feature

This feature is enabled by default.
Users can disable it by setting the environment variable AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0.

How the cache is refreshed

Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key.

The cache will be automatically refreshed in case any files of the dbt project change. Changes are calculated using the SHA256 of all the files in the directory. Initially, this feature was implemented using the files' modified timestamp, but this did not work well for some Airflow deployments (e.g., astro --dags since the timestamp was changed during deployments).

Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration:

  • ProjectConfig.dbt_vars
  • ProjectConfig.env_vars
  • ProjectConfig.partial_parse
  • RenderConfig.env_vars
  • RenderConfig.exclude
  • RenderConfig.select
  • RenderConfig.selector

The following argument was introduced in case users would like to define Airflow variables that could be used to refresh the cache (it expects a list with Airflow variable names):

  • RenderConfig.airflow_vars_to_purge_cache

Example:

RenderConfig(
    airflow_vars_to_purge_cache==["refresh_cache"]
)

Cache key

The Airflow variables that represent the dbt ls cache are prefixed by cosmos_cache. When using DbtDag, the keys use the DAG name. When using DbtTaskGroup, they consider the TaskGroup and parent task groups and DAG.

Examples:

  1. The DbtDag "cosmos_dag" will have the cache represented by "cosmos_cache__basic_cosmos_dag".
  2. The DbtTaskGroup "customers" declared inside teh DAG "basic_cosmos_task_group" will have the cache key "cosmos_cache__basic_cosmos_task_group__customers".

Cache value

The cache values contain a few properties:

  • last_modified timestamp, represented using the ISO 8601 format.
  • version is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time the cache was created
  • dbt_ls_compressed represents the dbt ls output compressed using zlib and encoded to base64 to be recorded as a string to the Airflow metadata database.

Steps used to compress:

        compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
        encoded_data = base64.b64encode(compressed_data)
        dbt_ls_compressed = encoded_data.decode("utf-8")

We are compressing this value because it will be significant for larger dbt projects, depending on the selectors used, and we wanted this approach to be safe and not clutter the Airflow metadata database.

Some numbers on the compression

  • A dbt project with 100 models can lead to a dbt ls output of 257k characters when using JSON. Zlib could compress it by 20x.
  • Another real-life dbt project with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It reduces to 489 KB after being compressed using zlib and encoded using base64 - to 6% of the original size.
  • Maximum cell size in Postgres: 20MB

The latency used to compress is in the order of milliseconds, not interfering in the performance of this solution.

Future work

Example of results before and after this change

Task queue times in Astro before the change:
Screenshot 2024-06-03 at 11 15 26

Task queue times in Astro after the change on the second run of the DAG:
Screenshot 2024-06-03 at 11 15 44

This feature is available in astronomer-cosmos==1.5.0a8.

The previous screenshots were taken when trying out the alpha release using the following Astro CLI project:
https://github.com/astronomer/cosmos-demo

The same was reproduced by running the DAG using Airflow standalone.

@netlify
Copy link
Copy Markdown

netlify Bot commented Jun 3, 2024

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 4dd9549
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/667abc9d99ea270008bbe57f

Comment thread cosmos/converter.py Outdated
Comment thread cosmos/dbt/graph.py Outdated
"""
logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_identifier}...")
if settings.enable_cache and settings.experimental_cache:
dbt_ls_cache = Variable.get(self.cache_identifier, "")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to query the db directly, so you bypass any configured secrets backend.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedcunningham how do you advise us to do this?
Wouldn't there be a risk that with this, we'd create a larger coupling of Cosmos to Airflow that could be more sensitive to different versions of Airflow?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jed, since this has been long standing, I'll be merging as it is - and I can make a follow up PR to address after your feedback.

@tatiana tatiana changed the title Support caching dbt ls cache in Airflow Variable WIP: Support caching dbt ls cache in Airflow Variable Jun 3, 2024
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 3, 2024

Codecov Report

Attention: Patch coverage is 99.51691% with 1 line in your changes missing coverage. Please review.

Project coverage is 96.05%. Comparing base (f983783) to head (cedb178).

Files Patch % Lines
cosmos/cache.py 98.73% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1014      +/-   ##
==========================================
+ Coverage   95.82%   96.05%   +0.22%     
==========================================
  Files          62       62              
  Lines        3020     3196     +176     
==========================================
+ Hits         2894     3070     +176     
  Misses        126      126              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@kzajaczkowski
Copy link
Copy Markdown

Looks promising!

@tatiana tatiana added area:performance Related to performance, like memory usage, CPU usage, speed, etc parsing:dbt_ls Issues, questions, or features related to dbt_ls parsing area:rendering Related to rendering, like Jinja, Airflow tasks, etc labels Jun 6, 2024
@tatiana tatiana force-pushed the cache-graph-json branch from 5a25d9d to cfed136 Compare June 10, 2024 11:05
@tatiana tatiana force-pushed the cache-graph-json branch from cfed136 to 9656788 Compare June 10, 2024 11:10
@tatiana tatiana force-pushed the cache-graph-json branch from e3988cb to 327192a Compare June 11, 2024 12:04
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. There are some minor cosmetic suggestions for the documentation that we can address iteratively in a subsequent PR.

Great feature support! Thank you! 👏🏽

Comment thread .pre-commit-config.yaml
types-PyYAML,
types-attrs,
attrs,
types-pytz,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
types-pytz,

Would we still need it now?

Comment thread pyproject.toml
"pytest-cov",
"pytest-describe",
"sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html
"types-pytz",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"types-pytz",

same


Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output
of this command as an `Airflow Variable <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html>`_.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.

Comment on lines +102 to +103
Caching the partial parse file
~~~~~~~~~~~~~
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Caching the partial parse file
~~~~~~~~~~~~~
Caching the partial parse file
~~~~~~~~~~~~~~~~~~~~~

Comment on lines +21 to +22
Caching the dbt ls output
~~~~~~~~~~~~~
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Caching the dbt ls output
~~~~~~~~~~~~~
Caching the dbt ls output
~~~~~~~~~~~~~~~~~~~

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE``

.. enable_cache_dbt_ls:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. enable_cache_dbt_ls:
.. _enable_cache_dbt_ls:

@pankajkoti pankajkoti mentioned this pull request Jun 27, 2024
@dwreeves
Copy link
Copy Markdown
Collaborator

dwreeves commented Jun 27, 2024

@tatiana A little late to this PR, but not late to 1.5.0:

I'm thinking the cache_identifier and cache_dir should go in the render_config, since the render config is supposed to handle all the options for things like this.

I think, going into something like Cosmos 2.0, it makes sense to consolidate the DbtGraph() init function's signature to something simple like:

class DbtGraph:

    load_method_mapping: dict[LoadMode, Callable[[], None]] = {}

    def __init__(
            self,
            project_config: ProjectConfig | None = None,
            profile_config: ProfileConfig | None = None,
            execution_config: ExecutionConfig | None = None,
            render_config: RenderConfig | None = None,
            **kwargs
    ):

as per the code example in #895.

Adding more kwargs means either we cannot do this, or we need to add more deprecations.

I think doing this will also help make the API cleaner, more consistent, and easier for users to reason about. Right now there are just a lot of things for users to tweak and it can be overwhelming. Keeping it consistent and locked inside the configs can reduce confusion.

WDYT?

A few other notes:

  • It doesn't seem like DbtGraph.current_version is used for anything; it's never assigned or referenced anywhere in the code. Should we remove it?
  • DbtGraph.load_method is a little confusing to me. It seems like load_method is usually referred to 'statelessly' so to speak via an argument in the load() method, but here statefulness is implied. (It's not actually stateless because the load method called is actually inside self.render_config.load_method, so it's a bit circular).

@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Jul 22, 2024

@tatiana A little late to this PR, but not late to 1.5.0:

Hi @dwreeves, Thanks a lot for all the very relevant feedback! I'm sorry I missed it, as it was made after the PR was merged.

I'm thinking the cache_identifier and cache_dir should go in the render_config, since the render config is supposed to handle all the options for things like this.

We can expose them there as part of a follow-up PR / future version of Cosmos. Just added: #1110

I'm also happy with the proposal to refactor the DbtGraph interface and #895. Would you be interested in working on this?

I created a PR to address the feedback on the dead code: #1111.

Your proposal to refactor the load implementation in #1001 is very good as well. Would you like to work on it?

@dwreeves
Copy link
Copy Markdown
Collaborator

Your proposal to refactor the load implementation in #1001 is very good as well. Would you like to work on it?

I would, but I've been busy... still trying to find time to contribute to this project. ☹️ The time will eventually come.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:config Related to configuration, like YAML files, environment variables, or executer configuration area:performance Related to performance, like memory usage, CPU usage, speed, etc area:rendering Related to rendering, like Jinja, Airflow tasks, etc dbt:list Primarily related to dbt list command or functionality lgtm This PR has been approved by a maintainer parsing:dbt_ls Issues, questions, or features related to dbt_ls parsing size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Introduce ClearCosmosVariableCache to clear cosmos cache in Airflow variables Reduce task queueing latency when using Cosmos

5 participants